package com.atguigu.flink0922.chapter02;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

/**
 * @Author lizhenchao@atguigu.cn
 * @Date 2021/3/1 10:14
 */
public class BoundedWordCount {
    public static void main(String[] args) throws Exception {
        // 1. 先获取执行环境
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        // 2. 获取数据流
        final DataStreamSource<String> lineStream = env.readTextFile("input/words.txt");
        // 3. 对数据流做各种转换
        final SingleOutputStreamOperator<String> wordStream = lineStream.flatMap(new FlatMapFunction<String, String>() {
            @Override
            public void flatMap(String value, Collector<String> out) throws Exception {
                for (String word : value.split(" ")) {
                    out.collect(word);
                }
            }
        });
        final SingleOutputStreamOperator<Tuple2<String, Long>> wordOneStream = wordStream.map(new MapFunction<String, Tuple2<String, Long>>() {
            @Override
            public Tuple2<String, Long> map(String word) throws Exception {
                return Tuple2.of(word, 1L);
            }
        });
        final KeyedStream<Tuple2<String, Long>, String> keyedStream = wordOneStream.keyBy(new KeySelector<Tuple2<String, Long>, String>() {
            @Override
            public String getKey(Tuple2<String, Long> value) throws Exception {
                return value.f0;
            }
        });
        final SingleOutputStreamOperator<Tuple2<String, Long>> result = keyedStream.sum(1);
    
        // 4. 输出
        result.print();
        
        // 5. 启动执行环境
        env.execute();
    }
}
